UDT协议实现分析——UDT Socket的创建

UDT API的用法

在分析 连接的建立过程 之前,先来看一下UDT API的用法。在UDT网络中,通常要有一个UDT Server监听在某台机器的某个UDP端口上,等待客户端的连接;有一个或多个客户端连接UDT Server;UDT Server接收到来自客户端的连接请求后,创建另外一个单独的UDT Socket用于与该客户端进行通信。

先来看一下UDT Server的简单的实现,UDT的开发者已经提供了一些demo程序可供参考,位于app/目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#include <unistd.h>
#include <cstdlib>
#include <cstring>
#include <netdb.h>
#include <iostream>
#include <udt.h>
using namespace std;
void* recvdata(void*);
struct UDTUpDown {
UDTUpDown() {
// use this function to initialize the UDT library
UDT::startup();
}
~UDTUpDown() {
// use this function to release the UDT library
UDT::cleanup();
}
};
int main(int argc, char* argv[]) {
if ((1 != argc) && ((2 != argc) || (0 == atoi(argv[1])))) {
cout << "usage: appserver [server_port]" << endl;
return 0;
}
// Automatically start up and clean up UDT module.
UDTUpDown _udt_;
addrinfo hints;
addrinfo* res;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
//hints.ai_socktype = SOCK_DGRAM;
string service("9000");
if (2 == argc)
service = argv[1];
if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res)) {
cout << "illegal port number or port is busy.\n" << endl;
return 0;
}
UDTSOCKET serv = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
// UDT Options
//UDT::setsockopt(serv, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>));
//UDT::setsockopt(serv, 0, UDT_MSS, new int(9000), sizeof(int));
//UDT::setsockopt(serv, 0, UDT_RCVBUF, new int(10000000), sizeof(int));
//UDT::setsockopt(serv, 0, UDP_RCVBUF, new int(10000000), sizeof(int));
if (UDT::ERROR == UDT::bind(serv, res->ai_addr, res->ai_addrlen)) {
cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
freeaddrinfo(res);
cout << "server is ready at port: " << service << endl;
if (UDT::ERROR == UDT::listen(serv, 10)) {
cout << "listen: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
sockaddr_storage clientaddr;
int addrlen = sizeof(clientaddr);
UDTSOCKET recver;
while (true) {
if (UDT::INVALID_SOCK == (recver = UDT::accept(serv, (sockaddr*) &clientaddr, &addrlen))) {
cout << "accept: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
char clienthost[NI_MAXHOST];
char clientservice[NI_MAXSERV];
getnameinfo((sockaddr *) &clientaddr, addrlen, clienthost, sizeof(clienthost), clientservice,
sizeof(clientservice), NI_NUMERICHOST | NI_NUMERICSERV);
cout << "new connection: " << clienthost << ":" << clientservice << endl;
pthread_t rcvthread;
pthread_create(&rcvthread, NULL, recvdata, new UDTSOCKET(recver));
pthread_detach(rcvthread);
}
UDT::close(serv);
return 0;
}
void* recvdata(void* usocket) {
UDTSOCKET recver = *(UDTSOCKET*) usocket;
delete (UDTSOCKET*) usocket;
char* data;
int size = 100000;
data = new char[size];
while (true) {
int rsize = 0;
int rs;
while (rsize < size) {
int rcv_size;
int var_size = sizeof(int);
UDT::getsockopt(recver, 0, UDT_RCVDATA, &rcv_size, &var_size);
if (UDT::ERROR == (rs = UDT::recv(recver, data + rsize, size - rsize, 0))) {
cout << "recv:" << UDT::getlasterror().getErrorMessage() << endl;
break;
}
rsize += rs;
}
if (rsize < size)
break;
}
delete[] data;
UDT::close(recver);
return NULL;
}

  1. 如在 UDT协议实现分析——UDT初始化和销毁 一文中提到的, 在调用任何UDT API之前,需要首先调用UDT::startup()来对库做一个初始化,并在结束之后执行UDT::cleanup()做最后的清理。在这个示例中,是创建了一个helper类UDTUpDown来帮助做这些事情的。利用编程语言本身提供的构造-析够机制来对UDT进行初始化和销毁要比手动调用这些函数要可靠得多。
  2. 获得本地UDP端口的网络地址。
  3. 调用UDT::socket()函数创建一个UDT Socket。这个Socket是UDT抽象出来的一个逻辑的Socket,我们并不能利用这个Socket本身来收发数据。
  4. 调用UDT::bind()将创建的UDT Socket绑定到本地端口的网络地址。这一步将会把UDT的逻辑Socket与能够进行数据收发的系统UDP socket进行关联,自此我们就可以利用UDT Socket进行收发数据了。
  5. 调用UDT::listen()告诉UDT,把这个UDT Socket做为这个端口上的Listening Socket。一个UDP端口可以被多个UDT Socket复用,在调用UDT::listen()之后,UDT就知道,在有其它节点连接这个UDP端口时,需要把相关的连接请求消息发送到哪个UDT Socket的接收缓冲区了。
    对绑定到相同UDP端口的多个不同UDT Socket调用UDT::listen()时,UDT是如何处理的?我们知道,一个UDP端口上最多只能有一个listening Socket。
  6. 调用UDT::accept()函数等待其它节点的连接。其它节点连接时,这个函数返回另外一个单独的UDT Socket,以用于与发起连接的节点进行通信。
    UDT::accept()函数返回的UDT Socket会被绑定到另外的一个不同的UDP端口,还是会被绑定到listening Socket所绑定的UDP端口?
  7. 使用UDT::accept()返回的UDT Socket,利用UDT::recv()与UDT::send()等函数同发起连接的节点进行数据传输。
  8. 在不再需要UDT Socket时,调用UDT::close()函数关掉它,以释放资源。
    然后再来看下UDT Client的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <unistd.h>
#include <cstdlib>
#include <cstring>
#include <netdb.h>
#include <iostream>
#include <udt.h>
using namespace std;
void* monitor(void*);
struct UDTUpDown {
UDTUpDown() {
// use this function to initialize the UDT library
UDT::startup();
}
~UDTUpDown() {
// use this function to release the UDT library
UDT::cleanup();
}
};
int main(int argc, char* argv[]) {
if ((3 != argc) || (0 == atoi(argv[2]))) {
cout << "usage: appclient server_ip server_port" << endl;
return 0;
}
// Automatically start up and clean up UDT module.
UDTUpDown _udt_;
struct addrinfo hints, *local, *peer;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
//hints.ai_socktype = SOCK_DGRAM;
if (0 != getaddrinfo(NULL, "9000", &hints, &local)) {
cout << "incorrect network address.\n" << endl;
return 0;
}
UDTSOCKET client = UDT::socket(local->ai_family, local->ai_socktype, local->ai_protocol);
// UDT Options
//UDT::setsockopt(client, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>));
//UDT::setsockopt(client, 0, UDT_MSS, new int(9000), sizeof(int));
//UDT::setsockopt(client, 0, UDT_SNDBUF, new int(10000000), sizeof(int));
//UDT::setsockopt(client, 0, UDP_SNDBUF, new int(10000000), sizeof(int));
//UDT::setsockopt(client, 0, UDT_MAXBW, new int64_t(12500000), sizeof(int));
// for rendezvous connection, enable the code below
/*
UDT::setsockopt(client, 0, UDT_RENDEZVOUS, new bool(true), sizeof(bool));
if (UDT::ERROR == UDT::bind(client, local->ai_addr, local->ai_addrlen))
{
cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
*/
freeaddrinfo(local);
if (0 != getaddrinfo(argv[1], argv[2], &hints, &peer)) {
cout << "incorrect server/peer address. " << argv[1] << ":" << argv[2] << endl;
return 0;
}
// connect to the server, implict bind
if (UDT::ERROR == UDT::connect(client, peer->ai_addr, peer->ai_addrlen)) {
cout << "connect: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
freeaddrinfo(peer);
int size = 100000;
char* data = new char[size];
pthread_create(new pthread_t, NULL, monitor, &client);
for (int i = 0; i < 1000000; i++) {
int ssize = 0;
int ss;
while (ssize < size) {
if (UDT::ERROR == (ss = UDT::send(client, data + ssize, size - ssize, 0))) {
cout << "send:" << UDT::getlasterror().getErrorMessage() << endl;
break;
}
ssize += ss;
}
if (ssize < size)
break;
}
UDT::close(client);
delete[] data;
return 0;
}
void* monitor(void* s) {
UDTSOCKET u = *(UDTSOCKET*) s;
UDT::TRACEINFO perf;
cout << "SendRate(Mb/s)\tRTT(ms)\tCWnd\tPktSndPeriod(us)\tRecvACK\tRecvNAK" << endl;
while (true) {
sleep(1);
if (UDT::ERROR == UDT::perfmon(u, &perf)) {
cout << "perfmon: " << UDT::getlasterror().getErrorMessage() << endl;
break;
}
cout << perf.mbpsSendRate << "\t\t" << perf.msRTT << "\t" << perf.pktCongestionWindow << "\t"
<< perf.usPktSndPeriod << "\t\t\t" << perf.pktRecvACK << "\t" << perf.pktRecvNAK << endl;
}
return NULL;
}

可以看到UDT Client连接UDT Server并发送数据的过程大体如下:

  1. 同样需要在调用任何UDT API之前,先调用UDT::startup()来对库做初始化,并在结束之后执行UDT::cleanup()做最后的清理。这里同样使用helper类UDTUpDown来帮助做这些事情。

  2. 获得UDT Server的网络地址。

  3. 调用UDT::socket()函数创建一个UDT Socket。这个Socket可以与特定的本地地址绑定,也可以不绑定。如果绑定,则发送数据时的出口地址就是该端口,如果不绑定,出口地址则是一个不确定的值。

  4. 调用UDT::connect()连接UDT Server。

  5. 使用UDT Socket,利用UDT::recv()与UDT::send()等函数同UDT Server进行数据传输。

  6. 在不再需要UDT Socket时,调用UDT::close()函数关掉它,以释放资源。

UDT基本的收发数据的API的用法大体如上面所示。接着我们来看,这些函数是如何实现的。

UDT Socket的创建

无论是UDT Server要listening,还是UDT client要连接UDT Server,在调用UDT::startup()初始化UDT之后,首先要做的事情都是调用UDT::socket()创建UDT Socket了。我们就来看一下创建UDT Socket的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
UDTSOCKET CUDTUnited::newSocket(int af, int type) {
if ((type != SOCK_STREAM) && (type != SOCK_DGRAM))
throw CUDTException(5, 3, 0);
CUDTSocket* ns = NULL;
try {
ns = new CUDTSocket;
ns->m_pUDT = new CUDT;
if (AF_INET == af) {
ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in);
((sockaddr_in*) (ns->m_pSelfAddr))->sin_port = 0;
} else {
ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in6);
((sockaddr_in6*) (ns->m_pSelfAddr))->sin6_port = 0;
}
} catch (...) {
delete ns;
throw CUDTException(3, 2, 0);
}
CGuard::enterCS(m_IDLock);
ns->m_SocketID = --m_SocketID;
CGuard::leaveCS(m_IDLock);
ns->m_Status = INIT;
ns->m_ListenSocket = 0;
ns->m_pUDT->m_SocketID = ns->m_SocketID;
ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM;
ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
ns->m_pUDT->m_pCache = m_pCache;
// protect the m_Sockets structure.
CGuard::enterCS(m_ControlLock);
try {
m_Sockets[ns->m_SocketID] = ns;
} catch (...) {
//failure and rollback
CGuard::leaveCS(m_ControlLock);
delete ns;
ns = NULL;
}
CGuard::leaveCS(m_ControlLock);
if (NULL == ns)
throw CUDTException(3, 2, 0);
return ns->m_SocketID;
}
UDTSOCKET CUDT::socket(int af, int type, int) {
if (!s_UDTUnited.m_bGCStatus)
s_UDTUnited.startup();
try {
return s_UDTUnited.newSocket(af, type);
} catch (CUDTException& e) {
s_UDTUnited.setError(new CUDTException(e));
return INVALID_SOCK;
} catch (bad_alloc&) {
s_UDTUnited.setError(new CUDTException(3, 2, 0));
return INVALID_SOCK;
} catch (...) {
s_UDTUnited.setError(new CUDTException(-1, 0, 0));
return INVALID_SOCK;
}
}
UDTSOCKET socket(int af, int type, int protocol) {
return CUDT::socket(af, type, protocol);
}

调用流程为,UDT::socket() -> CUDT::socket() -> CUDTUnited::newSocket()。

在CUDT::socket()中,我们看到,它会首先检查s_UDTUnited.m_bGCStatus,若发现UDT还没有初始化完成的话,则会调用s_UDTUnited.startup()进行初始化。这个地方对UDT状态s_UDTUnited.m_bGCStatus的检查没有问题,但在发现UDT没有初始化完成时调用s_UDTUnited.startup()似乎并不恰当。这个地方调用了s_UDTUnited.startup(),那与这次调用相对应的cleanup()又在哪调用了呢?显然在UDT内部是没有。若是调用cleanup()的职责总是在UDT的使用者,那倒不如在这个地方返回错误给调用者,完全让调用者来管理UDT的生命周期,以尽可能地避免资源泄漏。

创建UDT Socket的工作实际都在CUDTUnited::newSocket()函数中完成。可以看到,在这个函数中主要做了如下的这样一些事情:

  1. 创建一个CUDTSocket对象ns,并创建一个CUDT对象被ns->m_pUDT引用。初始化ns对象的Self网络地址,端口会被设置为0。在UDT中使用CUDTSocket和CUDT共同来描述一个Socket。每个UDT Socket都会有其相应的CUDTSocket对象和CUDT对象。

可以看一下CUDTSocket类的定义,来了解它都描述了UDT Socket的哪些属性(src/api.h):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class CUDTSocket {
public:
CUDTSocket();
~CUDTSocket();
UDTSTATUS m_Status; // current socket state
uint64_t m_TimeStamp; // time when the socket is closed
int m_iIPversion; // IP version
sockaddr* m_pSelfAddr; // pointer to the local address of the socket
sockaddr* m_pPeerAddr; // pointer to the peer address of the socket
UDTSOCKET m_SocketID; // socket ID
UDTSOCKET m_ListenSocket; // ID of the listener socket; 0 means this is an independent socket
UDTSOCKET m_PeerID; // peer socket ID
int32_t m_iISN; // initial sequence number, used to tell different connection from same IP:port
CUDT* m_pUDT; // pointer to the UDT entity
std::set<UDTSOCKET>* m_pQueuedSockets; // set of connections waiting for accept()
std::set<UDTSOCKET>* m_pAcceptSockets; // set of accept()ed connections
pthread_cond_t m_AcceptCond; // used to block "accept" call
pthread_mutex_t m_AcceptLock; // mutex associated to m_AcceptCond
unsigned int m_uiBackLog; // maximum number of connections in queue
int m_iMuxID; // multiplexer ID
pthread_mutex_t m_ControlLock; // lock this socket exclusively for control APIs: bind/listen/connect
private:
CUDTSocket(const CUDTSocket&);
CUDTSocket& operator=(const CUDTSocket&);
};

这个类只提供了构造和析构两个成员函数。还声明了私有的copy构造函数和赋值操作符函数,但没有定义它们,以避免类对象的复制。

可以再来看一下CUDTSocket的构造函数实现(src/api.h):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
CUDTSocket::CUDTSocket()
: m_Status(INIT),
m_TimeStamp(0),
m_iIPversion(0),
m_pSelfAddr(NULL),
m_pPeerAddr(NULL),
m_SocketID(0),
m_ListenSocket(0),
m_PeerID(0),
m_iISN(0),
m_pUDT(NULL),
m_pQueuedSockets(NULL),
m_pAcceptSockets(NULL),
m_AcceptCond(),
m_AcceptLock(),
m_uiBackLog(0),
m_iMuxID(-1) {
#ifndef WIN32
pthread_mutex_init(&m_AcceptLock, NULL);
pthread_cond_init(&m_AcceptCond, NULL);
pthread_mutex_init(&m_ControlLock, NULL);
#else
m_AcceptLock = CreateMutex(NULL, false, NULL);
m_AcceptCond = CreateEvent(NULL, false, false, NULL);
m_ControlLock = CreateMutex(NULL, false, NULL);
#endif
}

特别注意m_Status的初始化,该值被初始化为了INIT。从状态机的角度来看CUDTSocket,在它刚被new出来时,它处于INIT状态。

CUDT类有两个主要的职责,一是描述UDT Socket,包括所有的非静态成员变量和非静态成员函数,定义UDT Socket的大部分属性和所能提供操作;二是提供API,包括绝大部分的static成员函数,这些函数将调用者与UDT内部的实现连接起来。CUDT类这样的设计,明显违背了OO的SRP单一职责原则,这多少还是给代码的阅读带来了一定的障碍。再来看一下CUDT的构造函数实现(src/core.cpp):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
CUDT::CUDT() {
m_pSndBuffer = NULL;
m_pRcvBuffer = NULL;
m_pSndLossList = NULL;
m_pRcvLossList = NULL;
m_pACKWindow = NULL;
m_pSndTimeWindow = NULL;
m_pRcvTimeWindow = NULL;
m_pSndQueue = NULL;
m_pRcvQueue = NULL;
m_pPeerAddr = NULL;
m_pSNode = NULL;
m_pRNode = NULL;
// Initilize mutex and condition variables
initSynch();
// Default UDT configurations
m_iMSS = 1500;
m_bSynSending = true;
m_bSynRecving = true;
m_iFlightFlagSize = 25600;
m_iSndBufSize = 8192;
m_iRcvBufSize = 8192; //Rcv buffer MUST NOT be bigger than Flight Flag size
m_Linger.l_onoff = 1;
m_Linger.l_linger = 180;
m_iUDPSndBufSize = 65536;
m_iUDPRcvBufSize = m_iRcvBufSize * m_iMSS;
m_iSockType = UDT_STREAM;
m_iIPversion = AF_INET;
m_bRendezvous = false;
m_iSndTimeOut = -1;
m_iRcvTimeOut = -1;
m_bReuseAddr = true;
m_llMaxBW = -1;
m_pCCFactory = new CCCFactory<CUDTCC>;
m_pCC = NULL;
m_pCache = NULL;
// Initial status
m_bOpened = false;
m_bListening = false;
m_bConnecting = false;
m_bConnected = false;
m_bClosing = false;
m_bShutdown = false;
m_bBroken = false;
m_bPeerHealth = true;
m_ullLingerExpiration = 0;
}
void CUDT::initSynch() {
#ifndef WIN32
pthread_mutex_init(&m_SendBlockLock, NULL);
pthread_cond_init(&m_SendBlockCond, NULL);
pthread_mutex_init(&m_RecvDataLock, NULL);
pthread_cond_init(&m_RecvDataCond, NULL);
pthread_mutex_init(&m_SendLock, NULL);
pthread_mutex_init(&m_RecvLock, NULL);
pthread_mutex_init(&m_AckLock, NULL);
pthread_mutex_init(&m_ConnectionLock, NULL);
#else
m_SendBlockLock = CreateMutex(NULL, false, NULL);
m_SendBlockCond = CreateEvent(NULL, false, false, NULL);
m_RecvDataLock = CreateMutex(NULL, false, NULL);
m_RecvDataCond = CreateEvent(NULL, false, false, NULL);
m_SendLock = CreateMutex(NULL, false, NULL);
m_RecvLock = CreateMutex(NULL, false, NULL);
m_AckLock = CreateMutex(NULL, false, NULL);
m_ConnectionLock = CreateMutex(NULL, false, NULL);
#endif
}

都是成员变量的初始化,后续再来详细了解这些成员变量的作用。

  1. 为Socket分配SocketID,其值为CUDTUnited的m_SocketID递减的结果。m_SocketID在CUDTUnited的构造函数中初始化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
CUDTUnited::CUDTUnited()
: m_Sockets(),
m_ControlLock(),
m_IDLock(),
m_SocketID(0),
m_TLSError(),
m_mMultiplexer(),
m_MultiplexerLock(),
m_pCache(NULL),
m_bClosing(false),
m_GCStopLock(),
m_GCStopCond(),
m_InitLock(),
m_iInstanceCount(0),
m_bGCStatus(false),
m_GCThread(),
m_ClosedSockets() {
// Socket ID MUST start from a random value
srand((unsigned int) CTimer::getTime());
m_SocketID = 1 + (int) ((1 << 30) * (double(rand()) / RAND_MAX));
#ifndef WIN32
pthread_mutex_init(&m_ControlLock, NULL);
pthread_mutex_init(&m_IDLock, NULL);
pthread_mutex_init(&m_InitLock, NULL);
#else
m_ControlLock = CreateMutex(NULL, false, NULL);
m_IDLock = CreateMutex(NULL, false, NULL);
m_InitLock = CreateMutex(NULL, false, NULL);
#endif
#ifndef WIN32
pthread_key_create(&m_TLSError, TLSDestroy);
#else
m_TLSError = TlsAlloc();
m_TLSLock = CreateMutex(NULL, false, NULL);
#endif
m_pCache = new CCache<CInfoBlock>;
}

m_SocketID的初始值是一个随机数。

  1. 初始化ns及它的CUDT对象的一些成员变量。特别注意ns->m_Status的赋值,这里该值被赋为了INIT。从状态机的角度来看待CUDTSocket,在执行UDT Socket创建结束执行时,它处于INIT状态下。

  2. 将ns放在std::map m_Sockets中。

  3. 将UDT socket的SocketID返回给调用者。

这个接口直接返回一个表示UDT Socket的类对象,而不是一个handle,以便于调用者在调用UDT API时无需每次都输入UDT Socket handle参数,这样的API设计相对于UDT的这种API设计,有什么样的优缺点?

UDT的错误处理机制

在前面UDT Server和UDT Client的demo程序中,我们有看到,所有的UDT API都会在出错时返回一个错误码,比如UDT::bind()、UDT::bind2()、UDT::listen()和UDT::connect()等返回值类型为int的函数,在出错时返回UDT::ERROR,UDT::socket()和UDT::accept()等返回值类型为UDTSOCKET的函数在出错时返回UDT::INVALID_SOCK。

UDT API的调用者在检测到它们返回了错误值时,通过调用UDT::getlasterror()函数来获取关于异常更加详细的信息。这里我们就来看一下这套机制的实现。

注意看CUDT::socket()函数的实现。在这个函数中,会将实际创建UDT Socket的任务委托给s_UDTUnited.newSocket(),但这个调用会被包在一个try-catch块中。在s_UDTUnited.newSocket()函数执行的过程中发生任何的异常,都会先被CUDT::socket()捕获。CUDT::socket()函数在捕获这些异常之后,会根据捕获的异常的类型创建不同的CUDTException对象,并通过调用CUDTUnited::setError()函数将该CUDTException对象设置给s_UDTUnited。

我们来看一下CUDTUnited::setError()函数的实现(src/api.cpp):

1
2
3
4
5
6
7
8
9
10
11
void CUDTUnited::setError(CUDTException* e) {
#ifndef WIN32
delete (CUDTException*) pthread_getspecific(m_TLSError);
pthread_setspecific(m_TLSError, e);
#else
CGuard tg(m_TLSLock);
delete (CUDTException*)TlsGetValue(m_TLSError);
TlsSetValue(m_TLSError, e);
m_mTLSRecord[GetCurrentThreadId()] = e;
#endif
}

在这个函数中,会首先取出线程局部存储变量m_TLSError中保存的本线程上一次创建的 CUDTException对象,将其delete掉,并将本次创建的CUDTException对象设置进去。CUDTUnited类定义中m_TLSError的声明(src/api.h):

1
2
3
4
5
6
7
8
9
10
11
12
private:
pthread_key_t m_TLSError; // thread local error record (last error)
#ifndef WIN32
static void TLSDestroy(void* e) {
if (NULL != e)
delete (CUDTException*) e;
}
#else
std::map<DWORD, CUDTException*> m_mTLSRecord;
void checkTLSValue();
pthread_mutex_t m_TLSLock;
#endif

在CUDTUnited构造函数中也可以看到对这个对象的初始化。 CUDTUnited::TLSDestroy()函数是m_TLSError的析构函数,在m_TLSError最后被销毁时,这个函数被调用,以便于释放搜有还未释放的资源。

再来看UDT API的调用者获取上一次发生的异常的函数UDT::getlasterror():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CUDTException* CUDTUnited::getError() {
#ifndef WIN32
if (NULL == pthread_getspecific(m_TLSError))
pthread_setspecific(m_TLSError, new CUDTException);
return (CUDTException*) pthread_getspecific(m_TLSError);
#else
CGuard tg(m_TLSLock);
if(NULL == TlsGetValue(m_TLSError))
{
CUDTException* e = new CUDTException;
TlsSetValue(m_TLSError, e);
m_mTLSRecord[GetCurrentThreadId()] = e;
}
return (CUDTException*)TlsGetValue(m_TLSError);
#endif
}
CUDTException& CUDT::getlasterror() {
return *s_UDTUnited.getError();
}
ERRORINFO& getlasterror() {
return CUDT::getlasterror();
}

调用过程为UDT::getlasterror() -> CUDT::getlasterror() -> CUDTUnited::getError()。

主要就是从s_UDTUnited的线程局部存储变量m_TLSError中取出前面设置的本线程上次创建的CUDTException对象返回给调用者。

总结一下,UDT的使用者在调用UDT API时,UDT API会直接调用CUDT类对应的static API函数,在CUDT类的这些static API函数中会将做实际事情的工作委托给s_UDTUnited的相应函数,但这个委托调用会被包在一个try-catch block中。s_UDTUnited的函数在遇到异常情况时抛出异常,CUDT类的static API函数捕获异常,根据捕获到的异常的具体类型,创建不同的CUDTException对象设置给s_UDTUnited的线程局部存储变量m_TLSError中并向UDT API调用者返回错误码,UDT API的调用者检测到错误码后,通过UDT::getlasterror()获取存储在m_TLSError中的异常。

此处可以看到,CUDT提供的这一层API,一个比较重要的作用大概就是做异常处理了。

在UDT中,使用CUDTException来描述所有出现的异常。可以看一下这个类的定义(src/udt.h):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class UDT_API CUDTException {
public:
CUDTException(int major = 0, int minor = 0, int err = -1);
CUDTException(const CUDTException& e);
virtual ~CUDTException();
// Functionality:
// Get the description of the exception.
// Parameters:
// None.
// Returned value:
// Text message for the exception description.
virtual const char* getErrorMessage();
// Functionality:
// Get the system errno for the exception.
// Parameters:
// None.
// Returned value:
// errno.
virtual int getErrorCode() const;
// Functionality:
// Clear the error code.
// Parameters:
// None.
// Returned value:
// None.
virtual void clear();
private:
int m_iMajor; // major exception categories
// 0: correct condition
// 1: network setup exception
// 2: network connection broken
// 3: memory exception
// 4: file exception
// 5: method not supported
// 6+: undefined error
int m_iMinor; // for specific error reasons
int m_iErrno; // errno returned by the system if there is any
std::string m_strMsg; // text error message
std::string m_strAPI; // the name of UDT function that returns the error
std::string m_strDebug; // debug information, set to the original place that causes the error
public:
// Error Code
static const int SUCCESS;
static const int ECONNSETUP;
static const int ENOSERVER;
static const int ECONNREJ;
static const int ESOCKFAIL;
static const int ESECFAIL;
static const int ECONNFAIL;
static const int ECONNLOST;
static const int ENOCONN;
static const int ERESOURCE;
static const int ETHREAD;
static const int ENOBUF;
static const int EFILE;
static const int EINVRDOFF;
static const int ERDPERM;
static const int EINVWROFF;
static const int EWRPERM;
static const int EINVOP;
static const int EBOUNDSOCK;
static const int ECONNSOCK;
static const int EINVPARAM;
static const int EINVSOCK;
static const int EUNBOUNDSOCK;
static const int ENOLISTEN;
static const int ERDVNOSERV;
static const int ERDVUNBOUND;
static const int ESTREAMILL;
static const int EDGRAMILL;
static const int EDUPLISTEN;
static const int ELARGEMSG;
static const int EINVPOLLID;
static const int EASYNCFAIL;
static const int EASYNCSND;
static const int EASYNCRCV;
static const int ETIMEOUT;
static const int EPEERERR;
static const int EUNKNOWN;
};

这个class主要通过Major错误码和Minor错误码来描述异常情况,如果是调用系统调用出错了,还会用Errno值。

具体看一下这个class的实现,特别是CUDTException::getErrorMessage()函数,来了解每一个错误码所代表的含义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
CUDTException::CUDTException(int major, int minor, int err)
: m_iMajor(major),
m_iMinor(minor) {
if (-1 == err)
#ifndef WIN32
m_iErrno = errno;
#else
m_iErrno = GetLastError();
#endif
else
m_iErrno = err;
}
CUDTException::CUDTException(const CUDTException& e)
: m_iMajor(e.m_iMajor),
m_iMinor(e.m_iMinor),
m_iErrno(e.m_iErrno),
m_strMsg() {
}
CUDTException::~CUDTException() {
}
const char* CUDTException::getErrorMessage() {
// translate "Major:Minor" code into text message.
switch (m_iMajor) {
case 0:
m_strMsg = "Success";
break;
case 1:
m_strMsg = "Connection setup failure";
switch (m_iMinor) {
case 1:
m_strMsg += ": connection time out";
break;
case 2:
m_strMsg += ": connection rejected";
break;
case 3:
m_strMsg += ": unable to create/configure UDP socket";
break;
case 4:
m_strMsg += ": abort for security reasons";
break;
default:
break;
}
break;
case 2:
switch (m_iMinor) {
case 1:
m_strMsg = "Connection was broken";
break;
case 2:
m_strMsg = "Connection does not exist";
break;
default:
break;
}
break;
case 3:
m_strMsg = "System resource failure";
switch (m_iMinor) {
case 1:
m_strMsg += ": unable to create new threads";
break;
case 2:
m_strMsg += ": unable to allocate buffers";
break;
default:
break;
}
break;
case 4:
m_strMsg = "File system failure";
switch (m_iMinor) {
case 1:
m_strMsg += ": cannot seek read position";
break;
case 2:
m_strMsg += ": failure in read";
break;
case 3:
m_strMsg += ": cannot seek write position";
break;
case 4:
m_strMsg += ": failure in write";
break;
default:
break;
}
break;
case 5:
m_strMsg = "Operation not supported";
switch (m_iMinor) {
case 1:
m_strMsg += ": Cannot do this operation on a BOUND socket";
break;
case 2:
m_strMsg += ": Cannot do this operation on a CONNECTED socket";
break;
case 3:
m_strMsg += ": Bad parameters";
break;
case 4:
m_strMsg += ": Invalid socket ID";
break;
case 5:
m_strMsg += ": Cannot do this operation on an UNBOUND socket";
break;
case 6:
m_strMsg += ": Socket is not in listening state";
break;
case 7:
m_strMsg += ": Listen/accept is not supported in rendezous connection setup";
break;
case 8:
m_strMsg += ": Cannot call connect on UNBOUND socket in rendezvous connection setup";
break;
case 9:
m_strMsg += ": This operation is not supported in SOCK_STREAM mode";
break;
case 10:
m_strMsg += ": This operation is not supported in SOCK_DGRAM mode";
break;
case 11:
m_strMsg += ": Another socket is already listening on the same port";
break;
case 12:
m_strMsg += ": Message is too large to send (it must be less than the UDT send buffer size)";
break;
case 13:
m_strMsg += ": Invalid epoll ID";
break;
default:
break;
}
break;
case 6:
m_strMsg = "Non-blocking call failure";
switch (m_iMinor) {
case 1:
m_strMsg += ": no buffer available for sending";
break;
case 2:
m_strMsg += ": no data available for reading";
break;
default:
break;
}
break;
case 7:
m_strMsg = "The peer side has signalled an error";
break;
default:
m_strMsg = "Unknown error";
}
// Adding "errno" information
if ((0 != m_iMajor) && (0 < m_iErrno)) {
m_strMsg += ": ";
#ifndef WIN32
char errmsg[1024];
if (strerror_r(m_iErrno, errmsg, 1024) == 0)
m_strMsg += errmsg;
#else
LPVOID lpMsgBuf;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, m_iErrno, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR)&lpMsgBuf, 0, NULL);
m_strMsg += (char*)lpMsgBuf;
LocalFree(lpMsgBuf);
#endif
}
// period
#ifndef WIN32
m_strMsg += ".";
#endif
return m_strMsg.c_str();
}
int CUDTException::getErrorCode() const {
return m_iMajor * 1000 + m_iMinor;
}
void CUDTException::clear() {
m_iMajor = 0;
m_iMinor = 0;
m_iErrno = 0;
}
const int CUDTException::SUCCESS = 0;
const int CUDTException::ECONNSETUP = 1000;
const int CUDTException::ENOSERVER = 1001;
const int CUDTException::ECONNREJ = 1002;
const int CUDTException::ESOCKFAIL = 1003;
const int CUDTException::ESECFAIL = 1004;
const int CUDTException::ECONNFAIL = 2000;
const int CUDTException::ECONNLOST = 2001;
const int CUDTException::ENOCONN = 2002;
const int CUDTException::ERESOURCE = 3000;
const int CUDTException::ETHREAD = 3001;
const int CUDTException::ENOBUF = 3002;
const int CUDTException::EFILE = 4000;
const int CUDTException::EINVRDOFF = 4001;
const int CUDTException::ERDPERM = 4002;
const int CUDTException::EINVWROFF = 4003;
const int CUDTException::EWRPERM = 4004;
const int CUDTException::EINVOP = 5000;
const int CUDTException::EBOUNDSOCK = 5001;
const int CUDTException::ECONNSOCK = 5002;
const int CUDTException::EINVPARAM = 5003;
const int CUDTException::EINVSOCK = 5004;
const int CUDTException::EUNBOUNDSOCK = 5005;
const int CUDTException::ENOLISTEN = 5006;
const int CUDTException::ERDVNOSERV = 5007;
const int CUDTException::ERDVUNBOUND = 5008;
const int CUDTException::ESTREAMILL = 5009;
const int CUDTException::EDGRAMILL = 5010;
const int CUDTException::EDUPLISTEN = 5011;
const int CUDTException::ELARGEMSG = 5012;
const int CUDTException::EINVPOLLID = 5013;
const int CUDTException::EASYNCFAIL = 6000;
const int CUDTException::EASYNCSND = 6001;
const int CUDTException::EASYNCRCV = 6002;
const int CUDTException::ETIMEOUT = 6003;
const int CUDTException::EPEERERR = 7000;
const int CUDTException::EUNKNOWN = -1;

Done。

坚持原创技术分享,您的支持将鼓励我继续创作!